/*
Copyright 2024 The Spice.ai OSS Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

     https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
//! Runs federation integration tests for `MySQL`.
//!
//! Expects a Docker daemon to be running.
use crate::mysql::common::{get_mysql_conn, make_mysql_dataset, start_mysql_docker_container};
use std::sync::Arc;

use super::*;
use app::AppBuilder;
use datafusion_table_providers::sql::arrow_sql_gen::statement::{
    CreateTableBuilder, InsertBuilder,
};
use mysql_async::{prelude::Queryable, Params, Row};
use runtime::Runtime;
use tracing::instrument;

const MYSQL_DOCKER_CONTAINER: &str = "runtime-integration-test-federation-mysql";
const MYSQL_PORT: u16 = 13306;

#[instrument]
async fn init_mysql_db(port: u16) -> Result<(), anyhow::Error> {
    let pool = get_mysql_conn(port)?;
    let mut conn = pool.get_conn().await?;

    tracing::debug!("DROP TABLE IF EXISTS lineitem");
    let _: Vec<Row> = conn
        .exec("DROP TABLE IF EXISTS lineitem", Params::Empty)
        .await?;

    tracing::debug!("Downloading TPCH lineitem...");
    let tpch_lineitem = crate::get_tpch_lineitem().await?;

    let tpch_lineitem_schema = Arc::clone(&tpch_lineitem[0].schema());

    let create_table_stmt = CreateTableBuilder::new(tpch_lineitem_schema, "lineitem").build_mysql();
    tracing::debug!("CREATE TABLE lineitem...");
    let _: Vec<Row> = conn.exec(create_table_stmt, Params::Empty).await?;

    tracing::debug!("INSERT INTO lineitem...");
    let insert_stmt = InsertBuilder::new("lineitem", tpch_lineitem).build_mysql(None)?;
    let _: Vec<Row> = conn.exec(insert_stmt, Params::Empty).await?;
    tracing::debug!("MySQL initialized!");

    Ok(())
}

#[tokio::test]
async fn mysql_federation_push_down() -> Result<(), String> {
    type QueryTests<'a> = Vec<(&'a str, Vec<&'a str>, Option<Box<ValidateFn>>)>;
    let _tracing = init_tracing(Some("integration=debug,info"));
    let running_container = start_mysql_docker_container(MYSQL_DOCKER_CONTAINER, MYSQL_PORT)
        .await
        .map_err(|e| {
            tracing::error!("start_mysql_docker_container: {e}");
            e.to_string()
        })?;
    tracing::debug!("Container started");
    init_mysql_db(MYSQL_PORT).await.map_err(|e| {
        tracing::error!("init_mysql_db: {e}");
        e.to_string()
    })?;
    let app = AppBuilder::new("mysql_federation_push_down")
        .with_dataset(make_mysql_dataset("lineitem", "line", MYSQL_PORT, false))
        .build();

    let df = get_test_datafusion();

    let mut rt = Runtime::builder()
        .with_app(app)
        .with_datafusion(df)
        .build()
        .await;

    // Set a timeout for the test
    tokio::select! {
        () = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
            return Err("Timed out waiting for datasets to load".to_string());
        }
        () = rt.load_components() => {}
    }

    let queries: QueryTests = vec![
        (
            "SELECT * FROM line LIMIT 10",
            vec![
                "+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
                "| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |",
                "+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
                "| logical_plan  | Federated                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |",
                "|               |  Limit: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |",
                "|               |   Projection: line.l_orderkey, line.l_partkey, line.l_suppkey, line.l_linenumber, line.l_quantity, line.l_extendedprice, line.l_discount, line.l_tax, line.l_returnflag, line.l_linestatus, line.l_shipdate, line.l_commitdate, line.l_receiptdate, line.l_shipinstruct, line.l_shipmode, line.l_comment                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |",
                "|               |     TableScan: line                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |",
                "| physical_plan | SchemaCastScanExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |",
                "|               |   RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |",
                "|               |     VirtualExecutionPlan name=mysql compute_context=host=localhost,port=13306,db=mysqldb,user=root sql=SELECT line.l_orderkey, line.l_partkey, line.l_suppkey, line.l_linenumber, line.l_quantity, line.l_extendedprice, line.l_discount, line.l_tax, line.l_returnflag, line.l_linestatus, line.l_shipdate, line.l_commitdate, line.l_receiptdate, line.l_shipinstruct, line.l_shipmode, line.l_comment FROM line LIMIT 10 rewritten_sql=SELECT `lineitem`.`l_orderkey`, `lineitem`.`l_partkey`, `lineitem`.`l_suppkey`, `lineitem`.`l_linenumber`, `lineitem`.`l_quantity`, `lineitem`.`l_extendedprice`, `lineitem`.`l_discount`, `lineitem`.`l_tax`, `lineitem`.`l_returnflag`, `lineitem`.`l_linestatus`, `lineitem`.`l_shipdate`, `lineitem`.`l_commitdate`, `lineitem`.`l_receiptdate`, `lineitem`.`l_shipinstruct`, `lineitem`.`l_shipmode`, `lineitem`.`l_comment` FROM `lineitem` LIMIT 10 |",
                "|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |",
                "+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
            ],
            Some(Box::new(|result_batches| {
                for batch in result_batches {
                    assert_eq!(batch.num_columns(), 16, "num_cols: {}", batch.num_columns());
                    assert_eq!(batch.num_rows(), 10, "num_rows: {}", batch.num_rows());
                }
            })),
        ),
        (
            "SELECT * FROM line ORDER BY line.l_orderkey DESC LIMIT 10",
            vec![
                "+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
                "| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |",
                "+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
                "| logical_plan  | Federated                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |",
                "|               |  Limit: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |",
                "|               |   Sort: line.l_orderkey DESC NULLS FIRST                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |",
                "|               |     Projection: line.l_orderkey, line.l_partkey, line.l_suppkey, line.l_linenumber, line.l_quantity, line.l_extendedprice, line.l_discount, line.l_tax, line.l_returnflag, line.l_linestatus, line.l_shipdate, line.l_commitdate, line.l_receiptdate, line.l_shipinstruct, line.l_shipmode, line.l_comment                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |",
                "|               |       TableScan: line                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |",
                "| physical_plan | SchemaCastScanExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |",
                "|               |   RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |",
                "|               |     VirtualExecutionPlan name=mysql compute_context=host=localhost,port=13306,db=mysqldb,user=root sql=SELECT line.l_orderkey, line.l_partkey, line.l_suppkey, line.l_linenumber, line.l_quantity, line.l_extendedprice, line.l_discount, line.l_tax, line.l_returnflag, line.l_linestatus, line.l_shipdate, line.l_commitdate, line.l_receiptdate, line.l_shipinstruct, line.l_shipmode, line.l_comment FROM line ORDER BY line.l_orderkey DESC NULLS FIRST LIMIT 10 rewritten_sql=SELECT `lineitem`.`l_orderkey`, `lineitem`.`l_partkey`, `lineitem`.`l_suppkey`, `lineitem`.`l_linenumber`, `lineitem`.`l_quantity`, `lineitem`.`l_extendedprice`, `lineitem`.`l_discount`, `lineitem`.`l_tax`, `lineitem`.`l_returnflag`, `lineitem`.`l_linestatus`, `lineitem`.`l_shipdate`, `lineitem`.`l_commitdate`, `lineitem`.`l_receiptdate`, `lineitem`.`l_shipinstruct`, `lineitem`.`l_shipmode`, `lineitem`.`l_comment` FROM `lineitem` ORDER BY `lineitem`.`l_orderkey` DESC LIMIT 10 |",
                "|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |",
                "+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
            ],
            Some(Box::new(|result_batches| {
                for batch in result_batches {
                    assert_eq!(batch.num_columns(), 16, "num_cols: {}", batch.num_columns());
                    assert_eq!(batch.num_rows(), 10, "num_rows: {}", batch.num_rows());
                }
            })),
        ),
    ];

    for (query, expected_plan, validate_result) in queries {
        run_query_and_check_results(&mut rt, query, &expected_plan, validate_result).await?;
    }

    running_container.remove().await.map_err(|e| {
        tracing::error!("running_container.remove: {e}");
        e.to_string()
    })?;

    Ok(())
}

#[tokio::test]
#[allow(clippy::too_many_lines)]
async fn mysql_federation_inner_join_with_acc() -> Result<(), String> {
    type QueryTests<'a> = Vec<(&'a str, Vec<&'a str>, Option<Box<ValidateFn>>)>;
    let _tracing = init_tracing(Some("integration=debug,info"));
    let mysql_port = 13308;

    let running_container = start_mysql_docker_container(
        "runtime-integration-test-federation-inner-join-mysql",
        mysql_port,
    )
    .await
    .map_err(|e| {
        tracing::error!("start_mysql_docker_container: {e}");
        e.to_string()
    })?;
    tracing::debug!("Container started");
    init_mysql_db(mysql_port).await.map_err(|e| {
        tracing::error!("init_mysql_db: {e}");
        e.to_string()
    })?;
    let app = AppBuilder::new("mysql_federation_inner_join_with_accelerated_dataset")
        .with_dataset(make_mysql_dataset("lineitem", "line", mysql_port, false))
        .with_dataset(make_mysql_dataset("lineitem", "acc_line", mysql_port, true))
        .build();

    let df = get_test_datafusion();

    let mut rt = Runtime::builder()
        .with_app(app)
        .with_datafusion(df)
        .build()
        .await;
    // Set a timeout for the test
    tokio::select! {
        () = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
            return Err("Timed out waiting for datasets to load".to_string());
        }
        () = rt.load_components() => {}
    }

    tokio::time::sleep(std::time::Duration::from_secs(1)).await;

    let queries: QueryTests = vec![
        (
            "SELECT * FROM line inner join acc_line on acc_line.l_orderkey = line.l_orderkey LIMIT 10",
            vec![
                "+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
                "| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |",
                "+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
                "| logical_plan  | Limit: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |",
                "|               |   Inner Join: line.l_orderkey = acc_line.l_orderkey                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |",
                "|               |     Federated                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |",
                "|               |  Projection: line.l_orderkey, line.l_partkey, line.l_suppkey, line.l_linenumber, line.l_quantity, line.l_extendedprice, line.l_discount, line.l_tax, line.l_returnflag, line.l_linestatus, line.l_shipdate, line.l_commitdate, line.l_receiptdate, line.l_shipinstruct, line.l_shipmode, line.l_comment                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |",
                "|               |   TableScan: line                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |",
                "|               |     TableScan: acc_line projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |",
                "| physical_plan | GlobalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |",
                "|               |   CoalescePartitionsExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |",
                "|               |     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |",
                "|               |       HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_orderkey@0, l_orderkey@0)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |",
                "|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |",
                "|               |           RepartitionExec: partitioning=Hash([l_orderkey@0], 3), input_partitions=3                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |",
                "|               |             SchemaCastScanExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |",
                "|               |               RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |",
                "|               |                 VirtualExecutionPlan name=mysql compute_context=host=localhost,port=13308,db=mysqldb,user=root sql=SELECT line.l_orderkey, line.l_partkey, line.l_suppkey, line.l_linenumber, line.l_quantity, line.l_extendedprice, line.l_discount, line.l_tax, line.l_returnflag, line.l_linestatus, line.l_shipdate, line.l_commitdate, line.l_receiptdate, line.l_shipinstruct, line.l_shipmode, line.l_comment FROM line rewritten_sql=SELECT `lineitem`.`l_orderkey`, `lineitem`.`l_partkey`, `lineitem`.`l_suppkey`, `lineitem`.`l_linenumber`, `lineitem`.`l_quantity`, `lineitem`.`l_extendedprice`, `lineitem`.`l_discount`, `lineitem`.`l_tax`, `lineitem`.`l_returnflag`, `lineitem`.`l_linestatus`, `lineitem`.`l_shipdate`, `lineitem`.`l_commitdate`, `lineitem`.`l_receiptdate`, `lineitem`.`l_shipinstruct`, `lineitem`.`l_shipmode`, `lineitem`.`l_comment` FROM `lineitem` |",
                "|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |",
                "|               |           RepartitionExec: partitioning=Hash([l_orderkey@0], 3), input_partitions=3                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |",
                "|               |             SchemaCastScanExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |",
                "|               |               RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |",
                "|               |                 MemoryExec: partitions=1, partition_sizes=[99]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |",
                "|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |",
                "+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
            ],
            Some(Box::new(|result_batches| {
                for batch in result_batches {
                    assert_eq!(batch.num_columns(), 32, "num_cols: {}", batch.num_columns());
                    assert_eq!(batch.num_rows(), 10, "num_rows: {}", batch.num_rows());
                }
            })),
        ),
        (
            "SELECT line.* FROM line inner join acc_line on acc_line.l_orderkey = line.l_orderkey LIMIT 10",
            vec![
                "+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
                "| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |",
                "+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
                "| logical_plan  | Projection: line.l_orderkey, line.l_partkey, line.l_suppkey, line.l_linenumber, line.l_quantity, line.l_extendedprice, line.l_discount, line.l_tax, line.l_returnflag, line.l_linestatus, line.l_shipdate, line.l_commitdate, line.l_receiptdate, line.l_shipinstruct, line.l_shipmode, line.l_comment                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |",
                "|               |   Limit: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |",
                "|               |     Inner Join: line.l_orderkey = acc_line.l_orderkey                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |",
                "|               |       Federated                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |",
                "|               |  Projection: line.l_orderkey, line.l_partkey, line.l_suppkey, line.l_linenumber, line.l_quantity, line.l_extendedprice, line.l_discount, line.l_tax, line.l_returnflag, line.l_linestatus, line.l_shipdate, line.l_commitdate, line.l_receiptdate, line.l_shipinstruct, line.l_shipmode, line.l_comment                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |",
                "|               |   TableScan: line                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |",
                "|               |       TableScan: acc_line projection=[l_orderkey]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |",
                "| physical_plan | ProjectionExec: expr=[l_orderkey@0 as l_orderkey, l_partkey@1 as l_partkey, l_suppkey@2 as l_suppkey, l_linenumber@3 as l_linenumber, l_quantity@4 as l_quantity, l_extendedprice@5 as l_extendedprice, l_discount@6 as l_discount, l_tax@7 as l_tax, l_returnflag@8 as l_returnflag, l_linestatus@9 as l_linestatus, l_shipdate@10 as l_shipdate, l_commitdate@11 as l_commitdate, l_receiptdate@12 as l_receiptdate, l_shipinstruct@13 as l_shipinstruct, l_shipmode@14 as l_shipmode, l_comment@15 as l_comment]                                                                                                                                                                                                                                                                                                                                                                              |",
                "|               |   GlobalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |",
                "|               |     CoalescePartitionsExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |",
                "|               |       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |",
                "|               |         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_orderkey@0, l_orderkey@0)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |",
                "|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |",
                "|               |             RepartitionExec: partitioning=Hash([l_orderkey@0], 3), input_partitions=3                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |",
                "|               |               SchemaCastScanExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |",
                "|               |                 RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |",
                "|               |                   VirtualExecutionPlan name=mysql compute_context=host=localhost,port=13308,db=mysqldb,user=root sql=SELECT line.l_orderkey, line.l_partkey, line.l_suppkey, line.l_linenumber, line.l_quantity, line.l_extendedprice, line.l_discount, line.l_tax, line.l_returnflag, line.l_linestatus, line.l_shipdate, line.l_commitdate, line.l_receiptdate, line.l_shipinstruct, line.l_shipmode, line.l_comment FROM line rewritten_sql=SELECT `lineitem`.`l_orderkey`, `lineitem`.`l_partkey`, `lineitem`.`l_suppkey`, `lineitem`.`l_linenumber`, `lineitem`.`l_quantity`, `lineitem`.`l_extendedprice`, `lineitem`.`l_discount`, `lineitem`.`l_tax`, `lineitem`.`l_returnflag`, `lineitem`.`l_linestatus`, `lineitem`.`l_shipdate`, `lineitem`.`l_commitdate`, `lineitem`.`l_receiptdate`, `lineitem`.`l_shipinstruct`, `lineitem`.`l_shipmode`, `lineitem`.`l_comment` FROM `lineitem` |",
                "|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |",
                "|               |             RepartitionExec: partitioning=Hash([l_orderkey@0], 3), input_partitions=3                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |",
                "|               |               SchemaCastScanExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |",
                "|               |                 RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |",
                "|               |                   MemoryExec: partitions=1, partition_sizes=[99]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |",
                "|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |",
                "+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
            ],
            Some(Box::new(|result_batches| {
                for batch in result_batches {
                    assert_eq!(batch.num_columns(), 16, "num_cols: {}", batch.num_columns());
                    assert_eq!(batch.num_rows(), 10, "num_rows: {}", batch.num_rows());
                }
            })),
        ),
    ];

    for (query, expected_plan, validate_result) in queries {
        run_query_and_check_results(&mut rt, query, &expected_plan, validate_result).await?;
    }

    running_container.remove().await.map_err(|e| {
        tracing::error!("running_container.remove: {e}");
        e.to_string()
    })?;

    Ok(())
}
